Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refactor of dataset builder and executor #537

Open
wants to merge 57 commits into
base: main
Choose a base branch
from

Conversation

cyruszhang
Copy link
Collaborator

Key elements of this PR:

  1. YAML显式定义dataset不同来源;local和remote分开定义
  2. 更加灵活开放的数据集参数化控制;根据不同来源,支持不同参数和相关验证;并留出口子支持更多追加/细节配置
  3. 解绑Executor的hardcode支持(目前RayExecutor只接受local json格式,并在代码层面hardcode绑定);Executor/RayExecutor不绑定dataset输入格式,但是根据formatter/downloader对于executor类型的支持来判断是否可加载
  4. 提高Executor框架扩展性,以更方便支持Nemo、Dask、Spark等其他引擎
  5. 支持数据格式验证
  6. 额外的数据来源支持
    a. 支持modelscope
    b. 支持arxiv,下载、解压、引入
    c. 支持wiki,下载、解压、引入
    d. 支持commoncrawl,下载、解压、引入
  7. 兼容命令行目前的dataset_path格式
  8. 兼容数据混搭,data mixture
  9. 兼容empty_formatter/generated_dataset_config通路

design doc: https://aliyuque.antfin.com/yilei.z/cnk4dn/qomvqql62lyglrh2?singleDoc# 《Dataset/Loader/Executor的重构设计》

@cyruszhang cyruszhang removed the request for review from drcege February 7, 2025 20:39

# Validate conversation structure
for item in dataset:
turns = self._parse_turns(item['text'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These classes are still in progress, right? Do they need to be updated or implemented later?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataset format of conversations can be referred here.

MAX_SAMPLE_SIZE = 1000
if isinstance(dataset, NestedDataset):
sample_size = min(MAX_SAMPLE_SIZE, len(dataset))
sample = dataset.select(range(sample_size))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hf dataset, we can use dataset.take(n) method to get the top-n samples for higher efficiency. Related doc

}

def load_data(self, **kwargs):
dataset = rd.read_json(self.ds_config['path'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RayDataset.read_json() instead to support stream reading for json file. Ref:

@classmethod
def read_json(cls, paths: Union[str, List[str]]) -> RayDataset:
# Note: a temp solution for reading json stream
# TODO: replace with ray.data.read_json_stream once it is available
import pyarrow.json as js
try:
js.open_json
return read_json_stream(paths)
except AttributeError:
return rd.read_json(paths)


def load_data(self, **kwargs):
raise NotImplementedError(
'Huggingface data load strategy is not implemented')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Huggingface data load strategy for Ray is not implemented'

@@ -86,7 +36,8 @@ def __init__(self,
dataset: rd.Dataset,
dataset_path: str = None,
cfg=None) -> None:
self.data = preprocess_dataset(dataset, dataset_path, cfg)
self.data = dataset
# self.data = preprocess_dataset(dataset, dataset_path, cfg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is preprocess_dataset necessary? @pan-x-c

import pandas as pd
import regex as re
import requests
from bs4 import BeautifulSoup
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add bs4 in the minimal requirements


# The iterator and extractor code are in large part taken
# from the Red-Pajama repo
# https://github.com/togethercomputer/RedPajama-Data/tree/main/data_prep/arxiv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# implementation of the Wikipedia dataset preparation:
# https://github.com/huggingface/datasets/blob/7e30308f49f8c85dc7a2ab5aafbff04b5d2f38e2/datasets/wikipedia/wikipedia.py

MEDIA_ALIASES = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not import them from datasets?

WORK_DIR = os.path.dirname(os.path.realpath(__file__))


@SKIPPED_TESTS.register_module()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to describe the reason to skip this test.



def test_rewrite_cli_datapath_local_single_file(self):
dataset_path = "./data/sample.txt"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the path with the current file path WORK_DIR is better for tracing for readers. Ref:

data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..',
'data')
aud1_path = os.path.join(data_path, 'audio1.wav') # about 6s
aud2_path = os.path.join(data_path, 'audio2.wav') # about 14s
aud3_path = os.path.join(data_path, 'audio3.ogg') # about 1min59s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dj:core issues/PRs about the core functions of Data-Juicer dj:dataset issues/PRs about the dj-dataset enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants